package com.easy.mq.client;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Future;

import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.easy.mq.entry.JsonUtil;
import com.easy.mq.exception.MQException;

public class KafkaClient {

	private final Logger log = LoggerFactory.getLogger(KafkaClient.class);

	private Map<String, Object> properties;
	
	public static final String TOPIC = "default.topic";
	
	private String  topic;
	
	private Producer<String, String> producer;

	public KafkaClient() {

	}

	public KafkaClient(Map<String, Object> properties) {
		this.properties = properties;
	}

	public Future<RecordMetadata> send(Object content) throws MQException {
		return sendMsg(topic,"", content);
	}
	
	public Future<RecordMetadata> send(String topic,Object content) throws MQException {
		
		return sendMsg(topic,"", content);
	}


	public Future<RecordMetadata>  sendMsg(String topic,String key, Object content) throws MQException {
		
		try{
			if(StringUtils.isNotEmpty(topic)) {
				properties.put(TOPIC, topic);
			}
			
			producer = KafkaProducerPool.getProducer(properties);
			@SuppressWarnings({ "unchecked", "rawtypes" })
			Future<RecordMetadata> result = 
					producer.send(new ProducerRecord(topic,StringUtils.isEmpty(key) ? UUID.randomUUID().toString() : key , JsonUtil.bean2Json(content)));
			return result;
		} catch (Exception e) {
			log.error(e.getMessage(), e);
			KafkaProducerPool.shutdown(producer);
			throw new MQException(e.getMessage());
		} 
	}

	public String getTopic() {
		return topic;
	}

	public void setTopic(String topic) {
		this.topic = topic;
	}
	
	

}
